{
  "metadata" : {
    "name" : "Getting Started with Spark and Cassandra",
    "user_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "auto_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "customLocalRepo" : null,
    "customRepos" : null,
    "customDeps" : null,
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : null
  },
  "cells" : [ {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#Getting Started with Spark and Cassandra"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "##This notebook shows you how to get Spark and Cassandra working together. \nTo try this notebook you will need an existing Cassandra instance, either a local server or a cluster.\nWe require the ip address of one seed node from that Cassandra single server or cluster.\n\n_This notebook is an adaptation of the [Quick Stark Guide](https://github.com/datastax/spark-cassandra-connector/blob/master/doc/0_quick_start.md) from the [Spark Cassandra Connector Repository](https://github.com/datastax/spark-cassandra-connector)_"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### First, we add the dependency to the Spark Cassandra Connector."
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "_To download our dependencies, we can first setup a local directory as our temporary dependency repository. This is a Spark-Notebook specific requirement._"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : ":local-repo /tmp/repo",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Add the dependency to the Spark Cassandra Connector that matches the Spark and Cassandra versions being used.\nCheck the [version compatibility table](https://github.com/datastax/spark-cassandra-connector#version-compatibility) to pick the right dependency for your project:"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : ":dp com.datastax.spark % spark-cassandra-connector_2.10 % 1.4.0-M3",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Configure the Cassandra Address\nFor this step, we need to know the address of the Cassandra cluster (or local instance) that we will be using with this notebook. Please use a valid IP address or node name reachable from the node running the Spark Notebook _Server_ instance "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val cassandraHost = <cassandra-host>",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "We add the cassandra address to the Spark configuration.\nWhen using plain Spark, it's sufficient to add the Cassandra endpoint to the [Spark Config](http://spark.apache.org/docs/latest/configuration.html), like this:\n```Scala\nsparkConfig.set(\"spark.cassandra.connection.host\", cassandraHost)\n```\n_In the Spark Notebook, we need to do this using the reset context function, so that the built-in Spark context can be reconstructed using the new configuration. Here we can also change other Spark config parameters as needed._"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "reset(\"Getting Stated with Spark and Cassandra Notebook\", lastChanges = (c:SparkConf) => {\n  c.set(\"spark.cassandra.connection.host\", cassandraHost)\n})",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "> **Remark**\n> \n> The four previous cells wouldn't have been required if the notebook's metadata is used\n> \n> That is, if you open in the menu: `Edit > Edit Notebook Metadata`, you can update the custom parameters this way:\n> ```json\n> \"customLocalRepo\": \"/tmp/repo\",\n> \"customdeps\": [\n>   \"com.datastax.spark % spark-cassandra-connector_2.10 % 1.4.0-M3\"\n> ],\n> \"customSparkConf\" : {\n>   \"spark.cassandra.connection.host\": <cassandra-host>\n> }\n> ```"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "##Create a keyspace and a table for our test"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val createKeyspace = \"CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };\" \nval createTable = \"CREATE TABLE IF NOT EXISTS test.kv(key text PRIMARY KEY, value int);\"",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "_We are going to use the Cassandra connector in \"manual\" mode to execute those statements_\nFor that, we first need the Spark Conf object that now contains all information we need to connect to Cassandra"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val sparkConf = sparkContext.getConf",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import com.datastax.spark.connector.cql.CassandraConnector\nval cassandraConnector = CassandraConnector(sparkConf) ",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "###Using the connector, execute the create statements"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "cassandraConnector.withSessionDo { session =>\n  session.execute(createKeyspace)\n  session.execute(createTable)\n}",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "###Insert some sample data"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "CassandraConnector(sparkConf).withSessionDo { session =>\n  for (i <- 0 until 100) {\n    val insertStatement = s\"INSERT INTO test.kv(key, value) VALUES ('key${i}', $i);\"\n    session.execute(insertStatement)\n  }\n}",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "##Enable Cassandra-specific functions on the SparkContext and RDD:"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import com.datastax.spark.connector._",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#Loading and analyzing data from Cassandra\nUse the `sc.cassandraTable` method to view this table as a Spark `RDD`:\n_This method will issue a full table load into Spark_"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val rdd = sc.cassandraTable(\"test\", \"kv\")",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "###We can explore the data with Spark \n- count all elements"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "rdd.count",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "- get the first element"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "rdd.first",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "- sum all the values"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "rdd.map(_.getInt(\"value\")).sum",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "- visualize the data"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "rdd.map(row =>  row.getInt(\"value\")).take(100)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#Add Data from Spark to Cassandra\n###Create a new collection of data"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "output_stream_collapsed" : true,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val moreData = for (i <- 100 until 200) yield ((s\"addedKey$i\"),i) ",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "###Convert the collection into an RDD, \"parallelizing\" it"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val moreDataRdd = sparkContext.parallelize(moreData) ",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Store in Cassandra using the implicit methods on RDD"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "moreDataRdd.saveToCassandra(\"test\",\"kv\")",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Verify that the added data is there\n_Note that we don't need to re-declare the RDD. An action, such as `count` will trigger a re-computation of the RDD and therefore the new data will be taken into account_"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "rdd.count",
    "outputs" : [ ]
  } ],
  "nbformat" : 4
}